package com.offcn.bigdata.spark.sql.p1

import java.util.Properties

import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * Spark数据的加载和落地操作
  *    加载操作使用read.load
  *         RuntimeException: file:/E:/data/spark/sql/people.json is not a Parquet file. expected magic number at tail [80,
  *         默认读取的文件格式为parquet，是一种二进制格式文件，是twitter公司的产品，也是apache的顶级项目
  *         要想指定其他格式的文件，需要在load之前添加一个操作format(文件格式)，比如format("json")
  *    落地操作使用write.save
  */
object _05SparkDataLoadAndSaveOps {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
            .appName("_05SparkDataLoadAndSaveOps")
            .master("local[*]")
            .getOrCreate()

        val pdf = spark.read.format("json").load("file:/E:/data/spark/sql/people.json")
        pdf.show()

        /**
          * SaveMode:指定的是数据存储的模式
          *     ErrorIfExists   ：默认的值，如果目录存在，则报错
          *     Append          ：追加
          *     Ignore          ：忽略，如果目录不能存在，执行save操作，如果目录存在，啥都不干
          *     Overwrite       ：覆盖，删了，重新创建
          */
        pdf.write.mode(SaveMode.Overwrite).save("file:/E:/data/out/spark/parquet")

        pdf.write.mode(SaveMode.Ignore).format("csv").save("file:/E:/data/out/spark/csv")
        /*
        CREATE TABLE `person` (
          `name` varchar(20),
          `age` bigint(20) DEFAULT NULL,
          `height` double DEFAULT NULL,
          `province` varchar(40)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8

         */
        val url = "jdbc:mysql://localhost:3306/test"
        val table = "person"
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "sorry")
        pdf.write.mode(SaveMode.Append).jdbc(url, table, properties)
        spark.stop()
    }


    def loadData(spark: SparkSession): Unit = {
        var pdf = spark.read.parquet("file:/E:/data/spark/sql/users.parquet")
        pdf = spark.read.text("file:/E:/data/spark/sql/topn.txt")
        pdf = spark.read.orc("file:/E:/data/spark/sql/student.orc")

        pdf = spark.read.csv("file:/E:/data/spark/sql/country.csv").toDF("id", "country", "code")
        //https://docs.databricks.com/data/data-sources/read-csv.html
        pdf = spark.read
            .option("Header", "true")
            .option("Delimiter", "|")
            .option("Comment", "#")
            .csv("file:/E:/data/spark/sql/location-info.csv")
        //从jdbc中读取数据
        val url = "jdbc:mysql://localhost:3306/test"
        val table = "worker"
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "sorry")

        pdf = spark.read.jdbc(url, table, properties)
        pdf.show
    }
}
